package com.atguigu.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.app.func.DwdTableProcessFunction;
import com.atguigu.bean.TableProcess;
import com.atguigu.common.Constant;
import com.atguigu.utils.KafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;

//数据流:web/app -> Nginx -> 业务服务器(Mysql) -> Maxwell -> Kafka(ODS) -> FlinkApp -> Kafka(DWD)
//程  序:Mock -> MySQL -> Maxwell -> Kafka(ZK) -> Dwd09_BaseDbApp -> Kafka(ZK)
public class Dwd09_BaseDbApp {

    public static void main(String[] args) throws Exception {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.1 开启CK
//        env.enableCheckpointing(10000L);
//        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//        checkpointConfig.setCheckpointTimeout(20000L);
//        checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/flink-ck");
//        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//        //checkpointConfig.setCheckpointInterval(10000L);
//        checkpointConfig.setMinPauseBetweenCheckpoints(5000L);
//        checkpointConfig.setMaxConcurrentCheckpoints(2);
//        //默认是int类型的最大值
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
//        env.setStateBackend(new HashMapStateBackend());
//
//        System.setProperty("HADOOP_USER_NAME", "atguigu");

        //2.读取Kafka topic_db 主题数据创建流
        DataStreamSource<String> kafkaDS = env.fromSource(KafkaUtil.getKafkaSource(Constant.TOPIC_ODS_DB, "base_db_app_230315"),
                WatermarkStrategy.noWatermarks(),
                "kafka-source");

        //3.过滤&转换为JSON对象   主流
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                if (!"".equals(value)) {
                    try {
                        JSONObject jsonObject = JSON.parseObject(value);
                        out.collect(jsonObject);
                    } catch (JSONException e) {
                        System.out.println("脏数据：" + value);
                    }
                }
            }
        });

        //4.使用FlinkCDC读取配置信息表
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname(Constant.MYSQL_HOST)
                .port(Constant.MYSQL_PORT)
                .username("root")
                .password("000000")
                .databaseList("gmall-230315-config")
                .tableList("gmall-230315-config.table_process")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();
        DataStreamSource<String> mysqlDS = env.fromSource(mySqlSource,
                WatermarkStrategy.noWatermarks(),
                "mysql-source");

        //5.将配置信息流转换为     广播流
        MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("bc-state", String.class, TableProcess.class);
        BroadcastStream<String> broadcastDS = mysqlDS.broadcast(mapStateDescriptor);

        //6.连接主流与广播流
        BroadcastConnectedStream<JSONObject, String> connectedDS = jsonObjDS.connect(broadcastDS);

        //7.处理连接流  根据配置信息过滤主流数据
        SingleOutputStreamOperator<JSONObject> resultDS = connectedDS.process(new DwdTableProcessFunction(mapStateDescriptor));

        //8.将数据写出到Kafka
        //resultDS.sinkTo(KafkaUtil.getKafkaSink());
        resultDS.print(">>>>>>>>>>>");
        resultDS.sinkTo(KafkaUtil.getKafkaSink(new KafkaRecordSerializationSchema<JSONObject>() {
            @Nullable
            @Override
            public ProducerRecord<byte[], byte[]> serialize(JSONObject element, KafkaSinkContext context, Long timestamp) {
                return new ProducerRecord<>(element.getString("topic"), element.getString("data").getBytes());
            }
        }));

        //9.启动
        env.execute("Dwd09_BaseDbApp");

    }
}
